/*
* @Date: 2021/2/13
* @Author: XueChengwu <xuechengwu@erayt.com>
* @Copyright: 2015-2019 Erayt, Inc.
* @Description: If you have some questions, please contact: xuechengwu@erayt.com.
*/
import WebSocket from 'ws';
import config from './config';
import rabbitmq from './core/rabbitmq';
import get from '../utils/get';
import utils from './core/utils';
import kMean from '../brain/kmean';
import Markvo from '../brain/markvo';
import fs from 'fs';
import path from 'path';

let wss = new WebSocket.Server({ port: config.socket.port });
let titleClients = {};
wss.on('connection', function(ws) {
  ws.on('message', function(data) {
    const commands = data.split(':');
    if (commands.length > 0) {
      const command = commands[0];
      const titles = commands[1].split(',');
      for (let i = 0; i < titles.length; i++) {
        const title = titles[i];
        if (!titleClients[title]) {
          titleClients[title] = [];
        }
        if (command === 'A') {
          if (titleClients[title].findIndex(vo => vo === ws) === -1) {
            console.log('title===>', title);
            titleClients[title].push(ws);
          }
        } else if (command === 'D'){
          const index = titleClients[title].findIndex(ws);
          if (index >= 0) {
            titleClients[title].splice(index, 1);
          }
        }
      }
    }
  });
  ws.on('close', function() {
    console.log('ws close');
    for (let key in titleClients) {
      const tIndex = titleClients[key].findIndex(vo => vo === ws);
      if (tIndex >= 0) {
        titleClients[key].splice(tIndex, 1);
      }
    }
  });
  ws.on('error', function() {
    console.log('ws error');
    for (let key in titleClients) {
      const tIndex = titleClients[key].findIndex(vo => vo === ws);
      if (tIndex >= 0) {
        titleClients[key].splice(tIndex, 1);
      }
    }
  });
});

function getQuotation(symbol) {
  return new Promise((resolve, reject) => {
    const end = utils.formatDate(new Date(), 'yyyyMMdd');
    get('https://push2his.eastmoney.com/api/qt/stock/kline/get', {
      secid: symbol,
      klt: '101',
      fqt: 0,
      lmt: 2880,
      end,
      iscca: 1,
      fields1: 'f1,f2,f3,f4,f5',
      fields2: 'f51,f52,f53,f54,f55,f56,f57',
      ut:'f057cbcbce2a86e2866ab8877db1d059',
      forcect: 1
    }).then(res => {
      const result = JSON.parse(res.text);
      let ret = [];
      if(result.rc === 0) {
        let lines = result.data.klines;
        ret = lines.map((item) => {
          const bar = item.split(',');
          return {
            datetime: new Date(bar[0]).getTime(),
            open: parseFloat(bar[1]),
            close: parseFloat(bar[2]),
            high: parseFloat(bar[3]),
            low: parseFloat(bar[4]),
            volume: parseFloat(bar[5])
          }
        });
        resolve(ret);
      } else {
        resolve(ret);
      }
    });
  });
}
rabbitmq.getChannel().then(ch => {
  console.log('calMarkvo...');
  ch.assertExchange('calMarkvo', 'direct', { durable: false });
  ch.assertQueue('', { exclusive: true }).then(q => {
    ch.bindQueue(q.queue, 'calMarkvo', 'info');
    ch.consume(q.queue, msg => {
      const content = msg.content.toString();
      Promise.all([
        getQuotation('122.XAU'),
        getQuotation('119.USDJPY'),
        getQuotation('119.USDCHF'),
        getQuotation('119.USDCAD'),
        getQuotation('119.EURUSD'),
        getQuotation('119.GBPUSD'),
        getQuotation('119.AUDUSD'),
        getQuotation('119.NZDUSD'),
      ]).then(result => {
        const xauClose = result[0].map(vo => vo.close);
        const xauDiff = [];
        for (let i = 0; i < xauClose.length - 1; i++) {
          xauDiff.push((xauClose[i + 1] - xauClose[i]) / xauClose[i] * 100);
        }
        const jpyClose = result[1].map(vo => vo.close);
        const jpyDiff = [];
        for (let i = 0; i < jpyClose.length - 1; i++) {
          jpyDiff.push((jpyClose[i + 1] - jpyClose[i]) / jpyClose[i] * 100);
        }
        const chfClose = result[2].map(vo => vo.close);
        const chfDiff = [];
        for (let i = 0; i < chfClose.length - 1; i++) {
          chfDiff.push((chfClose[i + 1] - chfClose[i]) / chfClose[i] * 100);
        }
        const cadClose = result[3].map(vo => vo.close);
        const cadDiff = [];
        for (let i = 0; i < cadClose.length - 1; i++) {
          cadDiff.push((cadClose[i + 1] - cadClose[i]) / cadClose[i] * 100);
        }
        const eurClose = result[4].map(vo => vo.close);
        const eurDiff = [];
        for (let i = 0; i < eurClose.length - 1; i++) {
          eurDiff.push((eurClose[i + 1] - eurClose[i]) / eurClose[i] * 100);
        }
        const gbpClose = result[5].map(vo => vo.close);
        const gbpDiff = [];
        for (let i = 0; i < gbpClose.length - 1; i++) {
          gbpDiff.push((gbpClose[i + 1] - gbpClose[i]) / gbpClose[i] * 100);
        }
        const audClose = result[6].map(vo => vo.close);
        const audDiff = [];
        for (let i = 0; i < audClose.length - 1; i++) {
          audDiff.push((audClose[i + 1] - audClose[i]) / audClose[i] * 100);
        }
        const nzdClose = result[7].map(vo => vo.close);
        const nzdDiff = [];
        for (let i = 0; i < nzdClose.length - 1; i++) {
          nzdDiff.push((nzdClose[i + 1] - nzdClose[i]) / nzdClose[i] * 100);
        }
        const minLen = Math.min(
          xauDiff.length,
          jpyDiff.length,
          cadDiff.length,
          chfDiff.length,
          gbpDiff.length,
          eurDiff.length,
          audDiff.length,
          nzdDiff.length,
        );
        const vector = [];

        for (let i = 0; i < minLen; i++) {
          vector.unshift(
            [
              xauDiff[xauDiff.length - 1 - i],
              jpyDiff[jpyDiff.length - 1 - i],
              chfDiff[chfDiff.length - 1 - i],
              cadDiff[cadDiff.length - 1 - i],
              eurDiff[eurDiff.length - 1 - i],
              gbpDiff[gbpDiff.length - 1 - i],
              audDiff[audDiff.length - 1 - i],
              nzdDiff[nzdDiff.length - 1 - i],
            ]
          );
        }
        // console.log('min len ===>', minLen, vector);
        const { centroids, cluster } = kMean(vector, 32);
        const markvo = new Markvo(32);
        markvo.train(cluster, 40);
        fs.writeFileSync(
          path.resolve(__dirname, `./static/brain/markvo.json`),
          JSON.stringify({
            centroids,
            markvo: markvo.matrix,
          }),
        );
        if (titleClients['MARKVO']) {
          for (let i = 0; i < titleClients['MARKVO'].length; i++) {
            const ws = titleClients['MARKVO'][i];
            ws.send(JSON.stringify({ code: 0, msg: '计算成功' }));
          }
        }
      }).catch(err => {
        console.log('error===>', err);
        if (titleClients['MARKVO']) {
          for (let i = 0; i < titleClients['MARKVO'].length; i++) {
            const ws = titleClients['MARKVO'][i];
            ws.send(JSON.stringify({ code: 1, msg: '计算失败' }));
          }
        }
      });
    }, { noAck: false });
  }).catch(err => {
    console.log('calc markvo error', err);
  });
}).catch(err => {
  console.log('getChannel===>', err);
});
rabbitmq.getChannel().then(ch => {
  ch.assertExchange('calBayes', 'direct', { durable: false });
  ch.assertQueue('', { exclusive: true }).then(q => {
    ch.bindQueue(q.queue, 'calBayes', 'info');
    ch.consume(q.queue, msg => {

    }, { noAck: false });
  }).catch(err => {
    console.log('calc bayes error', err);
  });
}).catch(err => {
  console.log('getChannel===>', err);
});
rabbitmq.getChannel().then(ch => {
  ch.assertExchange('quotation', 'direct', { durable: false });
  ch.assertQueue('', { exclusive: true }).then(q => {
    ch.bindQueue(q.queue, 'quotation', 'info');
    ch.consume(q.queue, msg => {
      const content = msg.content.toString();
      const cmds = content.split(':');
      if (cmds.length > 0) {
        const title = cmds[0];
        if (titleClients[title]) {
          for (let i = 0; i < titleClients[title].length; i++) {
            const ws = titleClients[title][i];
            ws.send(content);
          }
        }
      }
      ch.ack(msg);
    }, { noAck: false });
  }).catch(err => {
    console.log('socket error', err);
  });
}).catch(err => {
  console.log('getChannel===>', err);
});

